package com.mars.module.admin.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.mars.common.base.UserContextInfo;
import com.mars.common.response.PageInfo;
import com.mars.framework.async.AsyncFactory;
import com.mars.framework.context.ContextUserInfoThreadHolder;
import com.mars.framework.exception.ServiceException;
import com.mars.framework.websocket.Message;
import com.mars.framework.websocket.WebSocketServer;
import com.mars.module.admin.entity.SysMessage;
import com.mars.module.admin.entity.SysUserMessageStats;
import com.mars.module.admin.mapper.SysMessageMapper;
import com.mars.module.admin.mapper.SysUserMessageStatsMapper;
import com.mars.module.admin.request.MsgRequest;
import com.mars.module.system.entity.SysUser;
import com.mars.module.system.mapper.SysUserMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import com.mars.module.admin.request.SysNotifyRequest;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.mars.module.admin.mapper.SysNotifyMapper;
import org.springframework.beans.BeanUtils;
import com.mars.module.admin.entity.SysNotify;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.mars.module.admin.service.ISysNotifyService;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * 通知公告业务层处理
 *
 * @author mars
 * @date 2023-12-06
 */
@Slf4j
@Service
@AllArgsConstructor
public class SysNotifyServiceImpl implements ISysNotifyService {

    private final SysNotifyMapper baseMapper;

    private final SysUserMapper sysUserMapper;

    private final SysUserMessageStatsMapper sysUserMessageStatsMapper;

    private final SysMessageMapper sysMessageMapper;

    private final WebSocketServer webSocketServer;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public SysNotify add(SysNotifyRequest request) {
        SysNotify entity = SysNotify.builder().build();
        BeanUtils.copyProperties(request, entity);
        baseMapper.insert(entity);
        return entity;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean delete(Long id) {
        return baseMapper.deleteById(id) > 0;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean deleteBatch(List<Long> ids) {
        return baseMapper.deleteBatchIds(ids) > 0;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean update(SysNotifyRequest request) {
        SysNotify entity = SysNotify.builder().build();
        BeanUtils.copyProperties(request, entity);
        return baseMapper.updateById(entity) > 0;
    }

    @Override
    public SysNotify getById(Long id) {
        return baseMapper.selectById(id);
    }

    @Override
    public PageInfo<SysNotify> pageList(SysNotifyRequest request) {
        Page<SysNotify> page = new Page<>(request.getPageNo(), request.getPageSize());
        LambdaQueryWrapper<SysNotify> query = this.buildWrapper(request);
        IPage<SysNotify> pageRecord = baseMapper.selectPage(page, query);
        return PageInfo.build(pageRecord);
    }

    @Override
    public void sendNotify(SysNotifyRequest param) {
        if (StringUtils.isEmpty(param.getContent())) {
            throw new ServiceException("通知内容不能为空");
        }
        UserContextInfo userInfo = ContextUserInfoThreadHolder.get();
        // 通知类型 可以设计成根据部门来推送消息 这里先查询所有用户推送消息
        // 两个线程处理一个处理在线用户 一个处理不在线用户消息
        List<Long> toUserIds = param.getToUserIds();
        List<Long> onLineUserIds = webSocketServer.getOnLineUserIds();
        if (CollectionUtils.isNotEmpty(toUserIds)) {
            toUserIds.forEach(userId -> {
                if (onLineUserIds.contains(userId)) {
                    AsyncFactory.runAsync(() -> this.executeOnlineUserMsg(param, userInfo, Collections.singletonList(userId)));
                } else {
                    AsyncFactory.runAsync(() -> this.executeUnLineUserMsg(param, userInfo, Collections.singletonList(userId), 1));
                }
            });
        } else {
            if (CollectionUtils.isNotEmpty(onLineUserIds)) {
                AsyncFactory.runAsync(() -> this.executeOnlineUserMsg(param, userInfo, onLineUserIds));
            } else {
                // 发送通知公告
                AsyncFactory.runAsync(() -> this.executeUnLineUserMsg(param, userInfo, onLineUserIds, 2));
            }
        }
    }

    @Override
    public void sendMsg(MsgRequest request) {
        UserContextInfo userInfo = ContextUserInfoThreadHolder.get();
        List<SysMessage> messageList = request.getToUserIds().stream().map(x -> SysMessage.builder()
                .content(request.getContent()).sender(userInfo.getId())
                .receiver(x).status(0)
                .senderName(userInfo.getUserName())
                .title(request.getTitle())
                .build()).collect(Collectors.toList());
        sysMessageMapper.insertBatchSomeColumn(messageList);


    }

    /**
     * 处理不在线用户消息
     *
     * @param param         param
     * @param userInfo      userInfo
     * @param onLineUserIds onLineUserIds
     * @param type          1 指定的人发送消息 2 通知公告
     */
    private void executeUnLineUserMsg(SysNotifyRequest param, UserContextInfo userInfo, List<Long> onLineUserIds, Integer type) {
        List<SysUser> userList;
        if (type == 1) {
            userList = sysUserMapper.selectList(Wrappers.lambdaQuery(SysUser.class).in(SysUser::getId, onLineUserIds));
        } else {
            userList = sysUserMapper.selectList(Wrappers.lambdaQuery(SysUser.class).notIn(SysUser::getId, onLineUserIds));
        }
        if (CollectionUtils.isNotEmpty(userList)) {
            List<Long> userIds = userList.stream().map(SysUser::getId).collect(Collectors.toList());
            this.saveBatchMsg(param, userInfo, userIds);
            List<SysUserMessageStats> messageStatsList = sysUserMessageStatsMapper.selectList(Wrappers.lambdaQuery(SysUserMessageStats.class).in(SysUserMessageStats::getUserId, userIds));
            if (CollectionUtils.isEmpty(messageStatsList)) {
                List<SysUserMessageStats> stats = userIds.stream().map(m -> SysUserMessageStats.builder().userId(m).unRead(1).build()).collect(Collectors.toList());
                sysUserMessageStatsMapper.insertBatchSomeColumn(stats);
            } else {
                List<SysUserMessageStats> statsList = messageStatsList.stream().peek(o -> o.setUnRead(o.getUnRead() + 1)).collect(Collectors.toList());
                for (SysUserMessageStats sysUserMessageStats : statsList) {
                    sysUserMessageStatsMapper.updateById(sysUserMessageStats);
                }
            }
        }
    }

    /**
     * 处理在线用户消息
     *
     * @param param         param
     * @param userInfo      userInfo
     * @param onLineUserIds onLineUserIds
     */
    private void executeOnlineUserMsg(SysNotifyRequest param, UserContextInfo userInfo, List<Long> onLineUserIds) {
        saveBatchMsg(param, userInfo, onLineUserIds);
        // 查询消息数量
        List<SysUserMessageStats> messageStatsList = sysUserMessageStatsMapper.selectList(Wrappers.lambdaQuery(SysUserMessageStats.class).in(SysUserMessageStats::getUserId, onLineUserIds));
        if (CollectionUtils.isEmpty(messageStatsList)) {
            onLineUserIds.parallelStream().forEach(userId -> {
                Message message = Message.builder().receiverId(userId)
                        .msgNumber(1).content(param.getContent())
                        .senderId(userInfo.getId()).build();
                // 获取消息
                webSocketServer.sendOneMessage(message);
                SysUserMessageStats messageStats = SysUserMessageStats.builder().userId(userId).unRead(1).build();
                sysUserMessageStatsMapper.insert(messageStats);
            });
        } else {
            Map<Long, SysUserMessageStats> statsMap = messageStatsList.stream().collect(Collectors.toMap(SysUserMessageStats::getUserId, Function.identity()));
            // 未读消息不为空
            onLineUserIds.forEach(x -> {
                if (MapUtils.isNotEmpty(statsMap)) {
                    SysUserMessageStats messageStats = statsMap.get(x);
                    Message message = Message.builder().receiverId(x).msgNumber(messageStats.getUnRead() + 1).content(param.getContent()).senderId(userInfo.getId()).build();
                    // 获取消息
                    webSocketServer.sendOneMessage(message);
                    messageStats.setUnRead(messageStats.getUnRead() + 1);
                    sysUserMessageStatsMapper.updateById(messageStats);
                }
            });
        }
    }

    private void saveBatchMsg(SysNotifyRequest param, UserContextInfo userInfo, List<Long> onLineUserIds) {
        // 查询未读数量
        List<SysMessage> messageList = onLineUserIds.stream().map(x -> SysMessage.builder()
                .content(param.getContent()).sender(userInfo.getId())
                .receiver(x).status(0)
                .senderName(userInfo.getUserName())
                .title(param.getTitle())
                .build()).collect(Collectors.toList());
        sysMessageMapper.insertBatchSomeColumn(messageList);
    }

    private LambdaQueryWrapper<SysNotify> buildWrapper(SysNotifyRequest param) {
        LambdaQueryWrapper<SysNotify> query = new LambdaQueryWrapper<>();
        if (StringUtils.isNotBlank(param.getTitle())) {
            query.like(SysNotify::getTitle, param.getTitle());
        }
        if (param.getType() != null) {
            query.eq(SysNotify::getType, param.getType());
        }
        if (StringUtils.isNotBlank(param.getContent())) {
            query.like(SysNotify::getContent, param.getContent());
        }
        if (StringUtils.isNotBlank(param.getRemark())) {
            query.like(SysNotify::getRemark, param.getRemark());
        }
        return query;
    }

}
